/*
 * @Author: Wangjun
 * @Date: 2021-07-12 16:36:13
 * @LastEditTime: 2023-11-21 14:05:51
 * @LastEditors: fuzhuang
 * @Description:clickhouse group数组
 * @FilePath: \xr_node_calcd:\go\src\gitee.com\haodreams\golib\ck\ck_groups.go
 * hnxr
 */

package ck

import (
	"errors"
	"strings"
	"sync/atomic"

	"gitee.com/haodreams/golib/gensql"
	"gitee.com/haodreams/golib/logs"
	"gitee.com/haodreams/golib/wrap"
	"gitee.com/haodreams/libs/config"
	"gitee.com/haodreams/libs/ee"
	"gorm.io/gorm"
)

type CKGroups struct {
	groups []*CKGroup
	Num    int //group 分库数
	ckPos  uint32
	enable bool
}

func (m *CKGroups) getIdx() int {
	i := atomic.AddUint32(&m.ckPos, 1)
	i = uint32(int(i) % m.Num)
	return int(i)
}

// Setup 初始化配置
func (m *CKGroups) Setup(conf config.Configer) (err error) {
	err = m.Init(conf)
	if err != nil {
		return ee.New(err)
	}
	return
}

/**
 * @description: host11,host12;host21,host22 ,group用分号分隔,host用逗号分隔
 * @param {config.Configer} conf
 * @return {*}
 */
func (m *CKGroups) Init(conf config.Configer) (err error) {
	m.enable = conf.DefaultBool("ck_enable", false)
	if !m.enable {
		return
	}
	host := conf.String("ck_group")
	if host == "" {
		err = errors.New("没有配置clickhouse数据源服务器,关闭使用clickhouse功能")
		return
	}

	datasource := conf.String("ck_dsn")
	if datasource == "" {
		err = errors.New("没有配置clickhouse DSN")
		return
	}

	tablePrefix := conf.String("ck_table_prefix")
	hosts := strings.Split(host, ";")
	for i := range hosts {
		g := NewCKGroup(hosts[i], datasource, tablePrefix)
		if g != nil {
			m.groups = append(m.groups, g)
		}
	}
	if len(m.groups) == 0 {
		err = errors.New("没有有效的clickhouse group")
		return
	}
	m.Num = len(m.groups[0].dbs)
	for _, g := range m.groups {
		if m.Num > len(g.dbs) {
			m.Num = len(g.dbs)
		}
	}
	if m.Num == 0 {
		err = errors.New("没有配置clickhouse地址")
	}
	return
}

/**
 * @description: host11,host12;host21,host22 ,group用分号分隔,host用逗号分隔
 * @param {config.Configer} conf
 * @return {*}
 */
func (m *CKGroups) InitArray(data []string) (ck *CKGroups, err error) {
	host := data[1]
	if host == "" {
		err = errors.New("没有配置clickhouse数据源服务器,关闭使用clickhouse功能")
		return
	}

	datasource := data[2]
	if datasource == "" {
		err = errors.New("没有配置clickhouse DSN")
		return
	}

	hosts := strings.Split(host, ";")
	for i := range hosts {
		g := NewCKGroup(hosts[i], datasource)
		if g != nil {
			m.groups = append(m.groups, g)
		}
	}
	if len(m.groups) == 0 {
		err = errors.New("没有有效的clickhouse group")
		return
	}
	m.Num = len(m.groups[0].dbs)
	for _, g := range m.groups {
		if m.Num > len(g.dbs) {
			m.Num = len(g.dbs)
		}
	}
	if m.Num == 0 {
		err = errors.New("没有配置clickhouse地址")
	}
	return m, err
}

/**
 * @description: 获取组数量
 * @param {*}
 * @return {*}
 */
func (m *CKGroups) GetGroupCount() int {
	return len(m.groups)
}

/**
 * @description: 并发写入clickhouse数据
 * @param {interface{}} rows 对象数组
 * 不判断enable
 * @return {*}
 */
func (m *CKGroups) InsertNoEnable(sql string, rows [][]interface{}) (count int, validGroup int64, err error) {
	idx := m.getIdx()
	num := len(m.groups)
	//now := time.Now()
	// defer func() {
	// 	logs.Info("group num:", num, "valild group", validGroup, "insert num:", count, "used time:", time.Since(now))
	// }()
	if num == 1 {
		count, err = m.groups[0].insert(idx, sql, rows)
		if err != nil {
			logs.Warn(err)
		} else {
			validGroup = 1
		}
		return
	}
	//并发执行
	wg := wrap.NewWaitGroupWrapper(num)
	for i := range m.groups {
		//重新赋值防止闭包的引用错误
		group := m.groups[i]
		wg.Wrap(func() {
			var er error
			count, er = group.insert(idx, sql, rows)
			if er != nil {
				err = er
				logs.Warn(err)
			} else {
				atomic.AddInt64(&validGroup, 1)
			}
		})
	}
	wg.Wait()
	return
}

/**
 * @description: 并发写入clickhouse数据
 * @param {interface{}} rows 对象数组
 * @return {*}
 */
func (m *CKGroups) Insert(sql string, rows [][]interface{}) (count int, validGroup int64, err error) {
	if !m.enable {
		return
	}
	idx := m.getIdx()
	num := len(m.groups)
	//now := time.Now()
	// defer func() {
	// 	logs.Info("group num:", num, "valild group", validGroup, "insert num:", count, "used time:", time.Since(now))
	// }()
	if num == 1 {
		count, err = m.groups[0].insert(idx, sql, rows)
		if err != nil {
			logs.Warn(err)
		} else {
			validGroup = 1
		}
		return
	}
	//并发执行
	wg := wrap.NewWaitGroupWrapper(num)
	for i := range m.groups {
		//重新赋值防止闭包的引用错误
		group := m.groups[i]
		wg.Wrap(func() {
			var er error
			count, er = group.insert(idx, sql, rows)
			if er != nil {
				err = er
				logs.Warn(err)
			} else {
				atomic.AddInt64(&validGroup, 1)
			}
		})
	}
	wg.Wait()
	return
}

/**
 * @description: 查找全部
 * @param {interface{}} dest 数组地址
 * @param {...interface{}} conds
 * @return {*}
 */
func (m *CKGroups) Find(dest interface{}, conds ...interface{}) (err error) {
	if !m.enable {
		return
	}
	idx := m.getIdx()
	return m.groups[0].Find(idx, dest, conds...)
}

/**
 * @description: 使用原生sql 查找一行
 * @param {interface{}} dest
 * @param {string} sql
 * @param {...interface{}} values
 * @return {*}
 */
func (m *CKGroups) RawScan(dest interface{}, sql string, values ...interface{}) (err error) {
	if !m.enable {
		return
	}
	idx := m.getIdx()
	return m.groups[0].RawScan(idx, dest, sql, values...)
}

/**
 * @description: 使用原生sql 查找全部
 * @param {interface{}} dest
 * @param {string} sql
 * @param {...interface{}} values
 * @return {*}
 */
func (m *CKGroups) RawFind(dest interface{}, sql string, values ...interface{}) (err error) {
	if !m.enable {
		return
	}
	idx := m.getIdx()
	return m.groups[0].RawFind(idx, dest, sql, values...)
}

func (m *CKGroups) GetDB() (db *gorm.DB) {
	if len(m.groups) > 0 {
		return m.groups[0].GetDB()
	}
	return nil
}

/*
*公用方法-写入clickhouse数据库
*param(clickhouse连接,json数组)
*return(变化行数，错误信息)
 */
func (m *CKGroups) InsertObjects(rows interface{}) (count int, validGroup int64, err error) {
	if !m.enable {
		return
	}
	db := m.GetDB()
	if db == nil {
		err = errors.New("没有有效的数据库对象")
		return
	}
	sql, records, err := gensql.GenSQLData(db, rows)
	if err != nil {
		return
	}

	if len(records) == 0 {
		return
	}

	return m.Insert(sql, records)
}

/*
*公用方法-写入clickhouse数据库
*不判断enable
*param(clickhouse连接,json数组)
*return(变化行数，错误信息)
 */
func (m *CKGroups) InsertObjectsNoEnable(rows interface{}) (count int, validGroup int64, err error) {
	db := m.GetDB()
	if db == nil {
		err = errors.New("没有有效的数据库对象")
		return
	}
	sql, records, err := gensql.GenSQLData(db, rows)
	if err != nil {
		return
	}

	if len(records) == 0 {
		return
	}

	return m.InsertNoEnable(sql, records)
}

/**
 * @description: 每个主机上都执行sql命令
 * @param {string} sql
 * @return {*}
 */
func (m *CKGroups) Exec(sql string) (err error) {
	if !m.enable {
		return
	}
	for i, group := range m.groups {
		er := group.exec(sql)
		if er != nil {
			logs.Error("group:", i+1, er)
			if err == nil {
				err = er
			}
			continue
		}
	}
	return
}
